package com.atguigu.realtime.util

import com.atguigu.realtime.bean.AlertInfo
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, Index}
import org.apache.spark.rdd.RDD

import scala.collection.JavaConverters._

/**
 * Author atguigu
 * Date 2020/10/21 14:14
 */
object ESUtil {
    
    val factory = new JestClientFactory
    val uris = ("http://hadoop102:9200" :: "http://hadoop103:9200" :: "http://hadoop104:9200" :: Nil).asJava
    val conf: HttpClientConfig = new HttpClientConfig.Builder(uris)
        .maxTotalConnection(100)
        .connTimeout(10000)
        .readTimeout(10000)
        .build()
    factory.setHttpClientConfig(conf)
    
    def main(args: Array[String]): Unit = {
    
    }
    
    // 向es批次输入数据
    def insertBulk(index: String, sources: Iterator[Object]) = {
        val client: JestClient = factory.getObject
        val bulkBuilder = new Bulk.Builder()
            .defaultIndex(index)
            .defaultType("_doc")
        /*sources.foreach(source => {
            val action = new Index.Builder(source).build()
            bulkBuilder.addAction(action)
        })*/
        /*sources
            .map(source => new Index.Builder(source).build())
            .foreach(bulkBuilder.addAction)*/
        
        sources
            .map {
                case (id: Any, source) =>
                    new Index.Builder(source).id(id.toString).build()
                case source =>
                    new Index.Builder(source).build()
            }
            .foreach(bulkBuilder.addAction)
        client.execute(bulkBuilder.build())
        client.shutdownClient()
    }
    
    // 插入单条数据
    def insertSingle(index: String, source: Object, id: String = null) = {
        val client: JestClient = factory.getObject
        
        val action = new Index.Builder(source)
            .index(index)
            .`type`("_doc")
            .id(id)
            .build()
        client.execute(action)
        client.shutdownClient()
    }
    
    implicit class RichEs(rdd:RDD[AlertInfo]){
        def saveToES(index: String) = {
            rdd.foreachPartition((it: Iterator[AlertInfo]) => {
                ESUtil.insertBulk(
                    "gmall_coupon_alert",
                    it.map(info => (info.mid + ":" + info.ts / 1000 / 60, info)))
            })
        }
    }
}

